package com.wstuo.common.activemq.service;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.management.openmbean.CompositeData;

import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.web.RemoteJMXBrokerFacade;
import org.apache.activemq.web.config.SystemPropertiesConfiguration;
import org.apache.log4j.Logger;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.transaction.annotation.Transactional;

import com.wstuo.common.activemq.dao.IQueueConfigureDAO;
import com.wstuo.common.activemq.dto.ActiveMQInfoDTO;
import com.wstuo.common.activemq.dto.ActiveMQMessageInfoDTO;
import com.wstuo.common.activemq.dto.QueueConfigureDTO;
import com.wstuo.common.activemq.entity.QueueConfigure;
import com.wstuo.common.util.TimeUtils;

/**
 * 查看队列信息Service
 * 
 * @author WSTUO
 */
public class ActiveMQService implements IActiveMQService {
	final static Logger LOGGER = Logger.getLogger(ActiveMQService.class);
	@Autowired
	private IQueueConfigureDAO queueConfigureDAO;
	/**
	 * ip地址常量
	 */
	private final String IPADDRESS = "ipAddress";
	/**
	 * 查看详细的地址常量
	 */
	private final String LOOKINFOPORT = "lookInfoPort";

	public Properties getconfig(){
		InputStream inputStream = null;
		Properties p = null;
		try {
			p =new Properties();
			inputStream = this.getClass().getClassLoader().getResourceAsStream("activemq.properties");
			p.load(inputStream);
			return p;
		} catch (IOException io) {
			LOGGER.error(io);
		}finally{
			if(inputStream!=null){
				try {
					inputStream.close();
				} catch (IOException io) {
					LOGGER.error(io);
				}
			}
		}
		return p;
	}

	/**
	 * 连接MQ服务
	 * 
	 * @return RemoteJMXBrokerFacade
	 */
	@Transactional
	private RemoteJMXBrokerFacade queueServer() {
		Properties p= getconfig();
		RemoteJMXBrokerFacade createConnector = new RemoteJMXBrokerFacade();
		System.setProperty("java.rmi.server.hostname", p.getProperty(IPADDRESS));
		
		StringBuffer webconsoleJmxUrl = new StringBuffer();
		webconsoleJmxUrl.append( "service:jmx:rmi:///jndi/rmi://" ).append( p.getProperty(IPADDRESS) )
		.append( ":" ).append( p.getProperty(LOOKINFOPORT) ).append( "/jmxrmi" );
		
		System.setProperty("webconsole.jmx.url", webconsoleJmxUrl.toString());
		System.setProperty("webconsole.jmx.user", "");
		System.setProperty("webconsole.jmx.password", "");
		// 创建配置
		SystemPropertiesConfiguration configuration = new SystemPropertiesConfiguration();
		// 创建链接
		createConnector.setConfiguration(configuration);
		return createConnector;
	}

	/**
	 * 获取全部队列
	 * 
	 * @return List<ActiveMQInfoDTO>
	 */
	@Transactional
	public List<ActiveMQInfoDTO> showQueues() {
		List<ActiveMQInfoDTO> li = new ArrayList<ActiveMQInfoDTO>();
		RemoteJMXBrokerFacade createConnector = queueServer();
		Collection<QueueViewMBean> queueViewList = null;
		try {
			queueViewList = createConnector.getQueues();
			for (QueueViewMBean queueViewMBean : queueViewList) {// 遍历队列获取队列详细信息
				ActiveMQInfoDTO dto = new ActiveMQInfoDTO();
				BeanUtils.copyProperties(queueViewMBean, dto);
				dto.setQueueName(queueViewMBean.getName());
				li.add(dto);
			}
		} catch (Exception e) {
			LOGGER.error(e);
		} finally {
			if (null != createConnector) {
				createConnector.shutdown();
			}
		}
		return li;
	}

	/**
	 * 获取队列中的详细信息
	 * 
	 * @param queueName
	 *            队列名称
	 * @return List<ActiveMQMessageInfoDTO>
	 */
	@Transactional
	public List<ActiveMQMessageInfoDTO> showQueueinfo(String queueName) {
		List<ActiveMQMessageInfoDTO> lis = new ArrayList<ActiveMQMessageInfoDTO>();
		RemoteJMXBrokerFacade createConnector = queueServer();
		QueueViewMBean queueViewMBean = null;
		try {
			queueViewMBean = createConnector.getQueue(queueName);
			CompositeData[] datas = queueViewMBean.browse();
			loopArrayToList(lis, datas);
		} catch (Exception e) {
			LOGGER.error(e);
		} finally {
			if (null != createConnector) {
				createConnector.shutdown();
			}
		}
		return lis;
	}

	private void loopArrayToList(List<ActiveMQMessageInfoDTO> lis,
			CompositeData[] datas) {
		for (int i = 0; i < datas.length; i++) {
			Map<String, String> map = new HashMap<String, String>();
			ActiveMQMessageInfoDTO message = new ActiveMQMessageInfoDTO();
			CompositeData data = datas[i];
			// mq中消息数据存储方式为key-value形式，
			// 获取消息内容可根据key来取得，不同的消息类型有不同的key: Text Message key - Text; Map
			message.setMessageID((String) data.get("JMSMessageID"));
			message.setPriority((Integer) data.get("JMSPriority"));
			message.setTimestamp(TimeUtils.format((Date) data.get("JMSTimestamp"),TimeUtils.DATETIME_PATTERN));
			message.setRedelivered((Boolean) data.get("JMSRedelivered"));
			String mqInfo = (String) data.get("PropertiesText");// 获取保存在MQ的map
			String[] maps = mqInfo.split(",");
			for (String m : maps) {
				String[] values = m.split("=");
				map.put(values[0].trim(), values[1]);
			}
			message.setMqTitle(map.get("mqTitle"));
			message.setMqContent(map.get("mqContent"));
			String creator = map.get("mqCreator").replaceAll("}", "");
			message.setMqCreator(creator);
			lis.add(message);
		}
	}

	/**
	 * 删除消息
	 * 
	 * @param messageId
	 * @param queueName
	 * @return boolean
	 */
	@Transactional
	public boolean deleteMessage(String messageId, String queueName) {
		boolean result = false;
		RemoteJMXBrokerFacade createConnector = queueServer();
		QueueViewMBean queueViewMBean = null;
		try {
			queueViewMBean = createConnector.getQueue(queueName);
			result = queueViewMBean.removeMessage(messageId);
		} catch (Exception e) {
			LOGGER.error(e);
		}finally {
			if (null != createConnector) {
				createConnector.shutdown();
			}
		}
		return result;
	}

	/**
	 * 查询队列执行状态
	 * 
	 * @param queueName
	 * @return boolean
	 */
	@Transactional
	public boolean findQueueStatus(String queueName) {
		boolean result = false;
		RemoteJMXBrokerFacade createConnector = queueServer();
		QueueViewMBean queueViewMBean = null;
		try {
			queueViewMBean = createConnector.getQueue(queueName);
			if (queueViewMBean.getDispatchCount() == queueViewMBean
					.getDequeueCount()) {// 根据入队个数和当前出队数来判断状态
				result = true;
			}
		} catch (Exception e) {
			LOGGER.error(e.getMessage());
		} finally {
			if (null != createConnector) {
				createConnector.shutdown();
			}
		}
		return result;
	}

	/**
	 * 根据ID查询配置信息
	 * 
	 * @return QueueConfigureDTO
	 */
	@Transactional
	public QueueConfigureDTO findQueueConfigure() {
		List<QueueConfigure> list = queueConfigureDAO.findAll();
		QueueConfigureDTO dto = new QueueConfigureDTO();
		if (list != null && list.size() > 0) {
			QueueConfigure entity = list.get(0);
			QueueConfigureDTO.entity2dto(entity, dto);
		}
		return dto;
	}

	/**
	 * 保存队列参数配置信息
	 * 
	 * @param dto
	 *            QueueConfigureDTO
	 */
	@Transactional
	public void saveQueueConfigure(QueueConfigureDTO dto) {
		QueueConfigure entity = new QueueConfigure();
		QueueConfigureDTO.dto2entity(dto, entity);
		queueConfigureDAO.merge(entity);
	}
	/**
	 * MQ接收消息转换方法
	 * @param msg
	 * @return
	 * @throws JMSException
	 * @throws MessageConversionException
	 */
	@SuppressWarnings({ "rawtypes" })
	public Object fromMessage(Message msg) throws JMSException,MessageConversionException {
		Object obj =null;
		if (msg instanceof ObjectMessage) {
			HashMap map = (HashMap) ((ObjectMessage) msg).getObjectProperty("Map");
			try {
				// Order,Order,Product must implements Seralizable
				ByteArrayInputStream bis = new ByteArrayInputStream(
						(byte[]) map.get("User"));
				ObjectInputStream ois = new ObjectInputStream(bis);
				obj = ois.readObject();
			} catch (IOException e) {
				LOGGER.error(e);
			} catch (ClassNotFoundException e) {
				LOGGER.error(e);
			}
			return obj;
		} else {
			throw new JMSException("Msg:[" + msg + "] is not Map");
		}
	}
	/**
	 * 发送信息内容转换
	 */
	@SuppressWarnings({ "rawtypes", "unchecked" })
    public Message toMessage(Object obj, Session session) throws JMSException,
		MessageConversionException {
		ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session
				.createObjectMessage();
		Map map = new HashMap();
		ByteArrayOutputStream bos = null;
		ObjectOutputStream oos = null;
		try {
			// Order,Order Product must implements Seralizable
		    bos = new ByteArrayOutputStream();
		    oos = new ObjectOutputStream(bos);
			oos.writeObject(obj);
			map.put("User", bos.toByteArray());
			objMsg.setObjectProperty("Map", map);//将一些信息保存进Message的Map属性里
		} catch (IOException e) {
			LOGGER.error(e);
		} finally {
		    try {
                if (bos != null) bos.close();
                if (oos != null) oos.close();
            } catch (IOException e) {
                LOGGER.error(e);
            }
		}
		return objMsg;
	}
}
